En este documento se realizan varias consultas a la base de datos de Mongodb a través de python. En este documento no se ha incluido la creación del dataset en mongodb desde python, ni la parte de limpieza de datos, ya que está hecho en la primera entrega en este fichero https://github.com/JTrillo/DataScience2/blob/master/Databases%20docs/MongoDB/insert_incidents.py
from bson import SON
import pymongo
from pymongo import MongoClient, GEOSPHERE
from datetime import datetime
import pandas as pd
import folium
from folium import plugins
import cufflinks as cf
cf.set_config_file(world_readable=True,offline=True)
def get_session(db_name):
"""
:param db_name: Nombre de la base de datos de los incidentes y los distritos
:return: Objeto de tipo DataBase con una conexión a la base de datos local
"""
client = MongoClient()
db = client[db_name]
return db
def create_indexes(db):
"""
GEOSPHERE: para procesar coordinadas esféricas
:param db: Añade un Ãndice sobre los campos que contienen información geoespacial
"""
db.incidents.create_index([("Location", GEOSPHERE)])
db.neighbours.create_index([("the_geom", GEOSPHERE)])
db.incidents.create_index([("Date", pymongo.ASCENDING)])
db.incidents.create_index([("Category", pymongo.ASCENDING)])
def first_query(db):
"""
:param db: referencia a la sesión de la bd
:return: Esta función obtiene los incidentes que están a una distancia máximo de 1000 metros
desde un punto representado por coordinadas geográficas en formato geojson
"""
# Montamos la query
query_incidents = {
"Location" :{
"$near": {
"$geometry" : SON([
("type", "Point"),
("coordinates", [-122.42158168136999, 37.7617007179518])
]),
"$maxDistance": 1000
}
}
}
# Ejecutamos la querry sobre la colección de incidencias
query_results = db.incidents.find(query_incidents)
df = pd.DataFrame(list(query_results))
return df
def second_query(db):
"""
:param db: referencia a la sesión de la bd
:return: devuelve el distrito que contiene las coordinadas usadas en el
operado de intersección
"""
# Montamos la query
query_distrito = {"the_geom":
{"$geoIntersects":
{"$geometry": SON([
("type", "Point"),
("coordinates", [-122.42158168136999, 37.7617007179518])])
}
}
}
# Ejecutamos la querry sobre la colección de incidencias
query_results = db.neighbours.find_one(query_distrito)
return query_results
def third_query(db):
"""
:param db: referencia a la sesión de la bd
:return: Devuelve el todos los incidentes, en dataframe, de un distrito, usando
operadores geo-espaciales, la consulta se realiza en fases sobre las dos
colecciones, primero encontramos el distrito, y luego buscamos todos los
incidentes que tienen las coordinadas dentro del polÃgono
"""
# Empezamos con el distrito
query_distrito = {"the_geom": {"$geoIntersects": {
"$geometry": SON([("type", "Point"), ("coordinates", [-122.42158168136999, 37.7617007179518])])}}}
distrito = db.neighbours.find_one(query_distrito)
# Ahora encontramos los incidentes
query_incidents = {"Location": {"$geoWithin": {"$geometry": distrito['the_geom']}}}
query_results = db.incidents.find(query_incidents)
df = pd.DataFrame(list(query_results))
return df
def since_february(db):
"""
:param db: referencia a la sesión de la bd
:return: Devuelve los incidentes que han ocurrido desde febrero de 2018
"""
fecha = datetime(2018, 2, 1)
incidents = db.incidents.find({"Date": {"$gt": fecha}})
df = pd.DataFrame(list(incidents))
return df
def date_querry(op, sdate, edate):
"""
:param opr: operador B: between dates, L: less than, G: greater than
:return: devuelve el filtro de la consulta según lo que se recibe por parámetro
en op
"""
switcher = {
'B': {"Date": {"$gte": sdate, "$lte": edate}}, # Between
'GE': {"Date": {"$gte": sdate}}, # Greater than or equal
'LE': {"Date": {"$lte": sdate}}, # less than or equal
}
return switcher.get(op)
def generic_date_search(db, op, sdate, *args, **kwargs):
"""
:param db: referencia a la sesión de bd
:param op: operador para seleccionar
:param sdate: primera fecha, mandaterio
:param args:
:param kwargs: Contiene argumento opcional de la seguna fecha
:return: incidentes que cumplen con el filtro sobre fechas
"""
edate = kwargs.get('edate', None)
if not edate is None:
query = date_querry(op, sdate, edate)
else:
query = date_querry(op, sdate, None)
query_results = db.incidents.find(query)
df = pd.DataFrame(list(query_results))
return df
def draw_map(ds):
"""
Esta función recibe un conjunto de incidentes y los dibuja en un mapa usando el paquete folium,
el mapa se guarda en un fichero.
:param ds: dataset de incidentes en formato de DataFrame
"""
incid_map = folium.Map(location=[37.7617007179518, -122.42158168136999], zoom_start=11, tiles='Stamen Terrain')
marker_cluster = plugins.MarkerCluster().add_to(incid_map)
for name, row in ds.iterrows():
folium.Marker([row["Y"], row["X"]], popup=row["Descript"]).add_to(marker_cluster)
incid_map.save('incidents.html')
return incid_map
def draw_heatmap(df):
"""
Esta función recibe un conjunto de incidentes y los dibuja en un mapa de calor que
se guarda en un fichero html
:param ds: dataset de incidentes en formato de DataFrame
"""
heat_map = folium.Map(location=[37.7617007179518, -122.42158168136999], zoom_start=11, tiles='Stamen Terrain')
heat_map.add_child(plugins.HeatMap([[row["Y"], row["X"]] for name, row in df.iterrows()]))
heat_map.save('heat_map_incidets.html')
return heat_map
def total_per_category(db):
"""
Esta función usa el framework de aggregate para obtener el total de cada categorÃa
:param db: referencia a la sesión de bd
:return: Dataframe composed of two columns, categories and count of each category
"""
#Agrupamos por categorÃa y los contamos, y luego ordenamos
pipeline = [
{"$group": {"_id": "$Category", "count": {"$sum": 1}}},
{"$sort": SON([("count", -1), ("_id", -1)])}
]
aggregate_results = db.incidents.aggregate(pipeline)
return pd.DataFrame(list(aggregate_results))
def total_per_hour(db):
"""
Esta función usa el framework de aggregate para obtener el total de incidentes de cada hora
:param db: referencia a la sesión de bd
:return: Dataframe composed of two columns, hours of the day and count of total incidents in that hour
"""
# Primero proyectamos cada documento en otro sacando solo el _id y la hora
pipeline = [
{"$project": { "h": { "$hour": "$Date" } }}, # nuevo campo h que ha sido proyecto desde la fecha
{"$group": {"_id": "$h", "count": {"$sum": 1}}},
{"$sort": SON([("_id", 1), ("count", 1)])}
]
aggregate_results = db.incidents.aggregate(pipeline)
df = pd.DataFrame(list(aggregate_results))
return df
sfdb = get_session("san_francisco_incidents") # Creamos una sesión
create_indexes(sfdb) # Nos aseguramos de que estén los Ãndices
fq = first_query(sfdb) # Consulta geoespacial con operador $near
fq.head()
sq = second_query(sfdb) # Consulta geoespacial con el operador $geoIntersects
tq = third_query(sfdb) # Consulta geoespacial
# Fechas para filtro de fechas
fecha1 = datetime(2017, 12, 1)
fecha2 = datetime(2017, 12, 31)
# Buscaremos los incidentes entre dos fechas, B=Between (ver doc de la función)
date_results = generic_date_search(sfdb,'B', fecha1, edate=fecha2)
# Buscamos todos los incidentes de febrero
feb = since_february(sfdb)
# Generamos un mapa con los miles primeros incidentes
m = draw_map(feb.iloc[:1000])
hm = draw_heatmap(date_results.iloc[:1000])
m
hm
# Obtenemos el total de incidentes por categorÃa y lo mostramos en un mapa de tipo PIE
df = total_per_category(sfdb)
df
df.iplot(kind="pie", labels="_id", values="count")
# Obtenemos el total de incidentes por cada hora y lo mostramos en un diagrama de barras
dfh = total_per_hour(sfdb)
dfh = dfh.rename(index=str, columns={"_id": "Hour", "count": "Total Incidents"})
dfh
dfh.iplot(kind='bar', filename='cufflinks/bar-chart-row')